# -*- coding: utf-8 -*-
from com.zsw.config.CSVReader import CSVReader
from com.zsw.config.JSONReader import JSONReader
from com.zsw.config.MySQLReader import MySQLReader, MySQLWriter
from com.zsw.config.SparkBase import SparkBase
from com.zsw.main.DataCleaner import DataCleaner

if __name__ == "__main__":
    # 1. 初始化读取配置
    csv_reader = CSVReader("file:///E:/pythonProject/pyspark-demo/pythonProject1/com/zsw/data/data.csv")
    json_reader = JSONReader("file:///E:/pythonProject/pyspark-demo/pythonProject1/com/zsw/data/data.json")
    mysql_reader = MySQLReader(
        url="jdbc:mysql://192.168.1.164:3306/xy_tmp",
        table="source_table",
        properties={"user": "root", "password": "govent", "driver": "com.mysql.jdbc.Driver"}
    )

    # 2. 读取数据
    try:
        df_csv = csv_reader.read()
        df_json = json_reader.read()
        df_mysql = mysql_reader.read()

        # 3. 清洗数据
        df_clean_csv = DataCleaner.clean(df_csv)
        df_clean_json = DataCleaner.clean(df_json)
        df_clean_mysql = DataCleaner.clean(df_mysql)

        # 4. 写入目标MySQL

        mysql_writer = MySQLWriter(
            url="jdbc:mysql://192.168.1.164:3306/xy_tmp",
            table="cleaned_data",
            properties={"user": "root", "password": "govnet", "driver": "com.mysql.jdbc.Driver"}
        )

        mysql_writer.write(df_clean_csv.union(df_clean_json).union(df_clean_mysql))
    finally:
        spark = SparkBase.stop_spark()


